package com.xiaofan.apitest.window

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WatermarkTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(50)
    env.setParallelism(1)

    val inputStream: DataStream[String] = env.socketTextStream("192.168.1.27", 9999)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    ) // .assignAscendingTimestamps(_.timestamp)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
        // ms
        override def extractTimestamp(element: SensorReading) = element.timestamp  * 1000L
      })

    val lateTag = new OutputTag[SensorReading]("late")

    // 每15秒统计一次，窗口内各传感器所有温度的最小值
      val resultStream: DataStream[SensorReading] = dataStream
      .keyBy(_.id)
      .timeWindow(Time.seconds(15), Time.seconds(2))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(lateTag)
      // .minBy("temperature") 温度最小的那条记录及相关属性
      .reduce { (curState, newData) => SensorReading(curState.id, newData.timestamp, curState.temperature.min(newData.temperature)) } // 温度最小值，及最新时间

    resultStream.getSideOutput(lateTag).print("late")

    resultStream.print("result")

    env.execute("watermark test")


  }
}
